3  * Copyright 2017 gRPC authors.
 
   5  * Licensed under the Apache License, Version 2.0 (the "License");
 
   6  * you may not use this file except in compliance with the License.
 
   7  * You may obtain a copy of the License at
 
   9  *     http://www.apache.org/licenses/LICENSE-2.0
 
  11  * Unless required by applicable law or agreed to in writing, software
 
  12  * distributed under the License is distributed on an "AS IS" BASIS,
 
  13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  14  * See the License for the specific language governing permissions and
 
  15  * limitations under the License.
 
  19 #include <grpc/support/port_platform.h>
 
  21 #include "src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h"
 
  23 #include <grpc/support/atm.h>
 
  24 #include <grpc/support/log.h>
 
  26 #include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h"
 
  27 #include "src/core/lib/iomgr/error.h"
 
  28 #include "src/core/lib/profiling/timers.h"
 
  30 static grpc_error* init_channel_elem(grpc_channel_element* elem,
 
  31                                      grpc_channel_element_args* args) {
 
  32   return GRPC_ERROR_NONE;
 
  35 static void destroy_channel_elem(grpc_channel_element* elem) {}
 
  40   // Stats object to update.
 
  41   grpc_core::RefCountedPtr<grpc_core::GrpcLbClientStats> client_stats;
 
  42   // State for intercepting send_initial_metadata.
 
  43   grpc_closure on_complete_for_send;
 
  44   grpc_closure* original_on_complete_for_send;
 
  45   bool send_initial_metadata_succeeded = false;
 
  46   // State for intercepting recv_initial_metadata.
 
  47   grpc_closure recv_initial_metadata_ready;
 
  48   grpc_closure* original_recv_initial_metadata_ready;
 
  49   bool recv_initial_metadata_succeeded = false;
 
  54 static void on_complete_for_send(void* arg, grpc_error* error) {
 
  55   call_data* calld = static_cast<call_data*>(arg);
 
  56   if (error == GRPC_ERROR_NONE) {
 
  57     calld->send_initial_metadata_succeeded = true;
 
  59   GRPC_CLOSURE_RUN(calld->original_on_complete_for_send, GRPC_ERROR_REF(error));
 
  62 static void recv_initial_metadata_ready(void* arg, grpc_error* error) {
 
  63   call_data* calld = static_cast<call_data*>(arg);
 
  64   if (error == GRPC_ERROR_NONE) {
 
  65     calld->recv_initial_metadata_succeeded = true;
 
  67   GRPC_CLOSURE_RUN(calld->original_recv_initial_metadata_ready,
 
  68                    GRPC_ERROR_REF(error));
 
  71 static grpc_error* init_call_elem(grpc_call_element* elem,
 
  72                                   const grpc_call_element_args* args) {
 
  73   GPR_ASSERT(args->context != nullptr);
 
  74   new (elem->call_data) call_data();
 
  75   return GRPC_ERROR_NONE;
 
  78 static void destroy_call_elem(grpc_call_element* elem,
 
  79                               const grpc_call_final_info* final_info,
 
  80                               grpc_closure* ignored) {
 
  81   call_data* calld = static_cast<call_data*>(elem->call_data);
 
  82   if (calld->client_stats != nullptr) {
 
  83     // Record call finished, optionally setting client_failed_to_send and
 
  85     calld->client_stats->AddCallFinished(
 
  86         !calld->send_initial_metadata_succeeded /* client_failed_to_send */,
 
  87         calld->recv_initial_metadata_succeeded /* known_received */);
 
  92 static void start_transport_stream_op_batch(
 
  93     grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
 
  94   call_data* calld = static_cast<call_data*>(elem->call_data);
 
  95   GPR_TIMER_SCOPE("clr_start_transport_stream_op_batch", 0);
 
  96   // Handle send_initial_metadata.
 
  97   if (batch->send_initial_metadata) {
 
  98     // Grab client stats object from user_data for LB token metadata.
 
  99     grpc_linked_mdelem* lb_token =
 
 100         batch->payload->send_initial_metadata.send_initial_metadata->idx.named
 
 102     if (lb_token != nullptr) {
 
 103       grpc_core::GrpcLbClientStats* client_stats =
 
 104           static_cast<grpc_core::GrpcLbClientStats*>(grpc_mdelem_get_user_data(
 
 105               lb_token->md, grpc_core::GrpcLbClientStats::Destroy));
 
 106       if (client_stats != nullptr) {
 
 107         calld->client_stats = client_stats->Ref();
 
 108         // Intercept completion.
 
 109         calld->original_on_complete_for_send = batch->on_complete;
 
 110         GRPC_CLOSURE_INIT(&calld->on_complete_for_send, on_complete_for_send,
 
 111                           calld, grpc_schedule_on_exec_ctx);
 
 112         batch->on_complete = &calld->on_complete_for_send;
 
 116   // Intercept completion of recv_initial_metadata.
 
 117   if (batch->recv_initial_metadata) {
 
 118     calld->original_recv_initial_metadata_ready =
 
 119         batch->payload->recv_initial_metadata.recv_initial_metadata_ready;
 
 120     GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready,
 
 121                       recv_initial_metadata_ready, calld,
 
 122                       grpc_schedule_on_exec_ctx);
 
 123     batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
 
 124         &calld->recv_initial_metadata_ready;
 
 126   // Chain to next filter.
 
 127   grpc_call_next_op(elem, batch);
 
 130 const grpc_channel_filter grpc_client_load_reporting_filter = {
 
 131     start_transport_stream_op_batch,
 
 132     grpc_channel_next_op,
 
 135     grpc_call_stack_ignore_set_pollset_or_pollset_set,
 
 137     0,  // sizeof(channel_data)
 
 139     destroy_channel_elem,
 
 140     grpc_channel_next_get_info,
 
 141     "client_load_reporting"};